# LocalStack Resource Provider Scaffolding v2
from __future__ import annotations

from pathlib import Path
from typing import TypedDict

import localstack.services.cloudformation.provider_utils as util
from localstack.services.cloudformation.resource_provider import (
    OperationStatus,
    ProgressEvent,
    ResourceProvider,
    ResourceRequest,
)


class DynamoDBTableProperties(TypedDict):
    KeySchema: list[KeySchema] | dict | None
    Arn: str | None
    AttributeDefinitions: list[AttributeDefinition] | None
    BillingMode: str | None
    ContributorInsightsSpecification: ContributorInsightsSpecification | None
    DeletionProtectionEnabled: bool | None
    GlobalSecondaryIndexes: list[GlobalSecondaryIndex] | None
    ImportSourceSpecification: ImportSourceSpecification | None
    KinesisStreamSpecification: KinesisStreamSpecification | None
    LocalSecondaryIndexes: list[LocalSecondaryIndex] | None
    PointInTimeRecoverySpecification: PointInTimeRecoverySpecification | None
    ProvisionedThroughput: ProvisionedThroughput | None
    SSESpecification: SSESpecification | None
    StreamArn: str | None
    StreamSpecification: StreamSpecification | None
    TableClass: str | None
    TableName: str | None
    Tags: list[Tag] | None
    TimeToLiveSpecification: TimeToLiveSpecification | None


class AttributeDefinition(TypedDict):
    AttributeName: str | None
    AttributeType: str | None


class KeySchema(TypedDict):
    AttributeName: str | None
    KeyType: str | None


class Projection(TypedDict):
    NonKeyAttributes: list[str] | None
    ProjectionType: str | None


class ProvisionedThroughput(TypedDict):
    ReadCapacityUnits: int | None
    WriteCapacityUnits: int | None


class ContributorInsightsSpecification(TypedDict):
    Enabled: bool | None


class GlobalSecondaryIndex(TypedDict):
    IndexName: str | None
    KeySchema: list[KeySchema] | None
    Projection: Projection | None
    ContributorInsightsSpecification: ContributorInsightsSpecification | None
    ProvisionedThroughput: ProvisionedThroughput | None


class LocalSecondaryIndex(TypedDict):
    IndexName: str | None
    KeySchema: list[KeySchema] | None
    Projection: Projection | None


class PointInTimeRecoverySpecification(TypedDict):
    PointInTimeRecoveryEnabled: bool | None


class SSESpecification(TypedDict):
    SSEEnabled: bool | None
    KMSMasterKeyId: str | None
    SSEType: str | None


class StreamSpecification(TypedDict):
    StreamViewType: str | None


class Tag(TypedDict):
    Key: str | None
    Value: str | None


class TimeToLiveSpecification(TypedDict):
    AttributeName: str | None
    Enabled: bool | None


class KinesisStreamSpecification(TypedDict):
    StreamArn: str | None


class S3BucketSource(TypedDict):
    S3Bucket: str | None
    S3BucketOwner: str | None
    S3KeyPrefix: str | None


class Csv(TypedDict):
    Delimiter: str | None
    HeaderList: list[str] | None


class InputFormatOptions(TypedDict):
    Csv: Csv | None


class ImportSourceSpecification(TypedDict):
    InputFormat: str | None
    S3BucketSource: S3BucketSource | None
    InputCompressionType: str | None
    InputFormatOptions: InputFormatOptions | None


REPEATED_INVOCATION = "repeated_invocation"


class DynamoDBTableProvider(ResourceProvider[DynamoDBTableProperties]):
    TYPE = "AWS::DynamoDB::Table"  # Autogenerated. Don't change
    SCHEMA = util.get_schema_path(Path(__file__))  # Autogenerated. Don't change

    def create(
        self,
        request: ResourceRequest[DynamoDBTableProperties],
    ) -> ProgressEvent[DynamoDBTableProperties]:
        """
        Create a new resource.

        Primary identifier fields:
          - /properties/TableName

        Required properties:
          - KeySchema

        Create-only properties:
          - /properties/TableName
          - /properties/ImportSourceSpecification

        Read-only properties:
          - /properties/Arn
          - /properties/StreamArn

        IAM permissions required:
          - dynamodb:CreateTable
          - dynamodb:DescribeImport
          - dynamodb:DescribeTable
          - dynamodb:DescribeTimeToLive
          - dynamodb:UpdateTimeToLive
          - dynamodb:UpdateContributorInsights
          - dynamodb:UpdateContinuousBackups
          - dynamodb:DescribeContinuousBackups
          - dynamodb:DescribeContributorInsights
          - dynamodb:EnableKinesisStreamingDestination
          - dynamodb:DisableKinesisStreamingDestination
          - dynamodb:DescribeKinesisStreamingDestination
          - dynamodb:ImportTable
          - dynamodb:ListTagsOfResource
          - dynamodb:TagResource
          - dynamodb:UpdateTable
          - kinesis:DescribeStream
          - kinesis:PutRecords
          - iam:CreateServiceLinkedRole
          - kms:CreateGrant
          - kms:Decrypt
          - kms:Describe*
          - kms:Encrypt
          - kms:Get*
          - kms:List*
          - kms:RevokeGrant
          - logs:CreateLogGroup
          - logs:CreateLogStream
          - logs:DescribeLogGroups
          - logs:DescribeLogStreams
          - logs:PutLogEvents
          - logs:PutRetentionPolicy
          - s3:GetObject
          - s3:GetObjectMetadata
          - s3:ListBucket

        """
        model = request.desired_state

        if not request.custom_context.get(REPEATED_INVOCATION):
            request.custom_context[REPEATED_INVOCATION] = True

            if not model.get("TableName"):
                model["TableName"] = util.generate_default_name(
                    request.stack_name, request.logical_resource_id
                )

            if model.get("ProvisionedThroughput"):
                model["ProvisionedThroughput"] = self.get_ddb_provisioned_throughput(model)

            if model.get("GlobalSecondaryIndexes"):
                model["GlobalSecondaryIndexes"] = self.get_ddb_global_sec_indexes(model)

            properties = [
                "TableName",
                "AttributeDefinitions",
                "KeySchema",
                "BillingMode",
                "ProvisionedThroughput",
                "LocalSecondaryIndexes",
                "GlobalSecondaryIndexes",
                "Tags",
                "SSESpecification",
            ]
            create_params = util.select_attributes(model, properties)

            if sse_specification := create_params.get("SSESpecification"):
                # rename bool attribute to fit boto call
                sse_specification["Enabled"] = sse_specification.pop("SSEEnabled")

            if stream_spec := model.get("StreamSpecification"):
                create_params["StreamSpecification"] = {
                    "StreamEnabled": True,
                    **(stream_spec or {}),
                }

            response = request.aws_client_factory.dynamodb.create_table(**create_params)
            model["Arn"] = response["TableDescription"]["TableArn"]

            if model.get("KinesisStreamSpecification"):
                request.aws_client_factory.dynamodb.enable_kinesis_streaming_destination(
                    **self.get_ddb_kinesis_stream_specification(model)
                )

            # add TTL config
            if ttl_config := model.get("TimeToLiveSpecification"):
                request.aws_client_factory.dynamodb.update_time_to_live(
                    TableName=model["TableName"], TimeToLiveSpecification=ttl_config
                )

            return ProgressEvent(
                status=OperationStatus.IN_PROGRESS,
                resource_model=model,
                custom_context=request.custom_context,
            )

        description = request.aws_client_factory.dynamodb.describe_table(
            TableName=model["TableName"]
        )

        if description["Table"]["TableStatus"] != "ACTIVE":
            return ProgressEvent(
                status=OperationStatus.IN_PROGRESS,
                resource_model=model,
                custom_context=request.custom_context,
            )

        if model.get("TimeToLiveSpecification"):
            request.aws_client_factory.dynamodb.update_time_to_live(
                TableName=model["TableName"],
                TimeToLiveSpecification=model["TimeToLiveSpecification"],
            )

        if description["Table"].get("LatestStreamArn"):
            model["StreamArn"] = description["Table"]["LatestStreamArn"]

        return ProgressEvent(
            status=OperationStatus.SUCCESS,
            resource_model=model,
        )

    def read(
        self,
        request: ResourceRequest[DynamoDBTableProperties],
    ) -> ProgressEvent[DynamoDBTableProperties]:
        """
        Fetch resource information

        IAM permissions required:
          - dynamodb:DescribeTable
          - dynamodb:DescribeContinuousBackups
          - dynamodb:DescribeContributorInsights
        """
        raise NotImplementedError

    def delete(
        self,
        request: ResourceRequest[DynamoDBTableProperties],
    ) -> ProgressEvent[DynamoDBTableProperties]:
        """
        Delete a resource

        IAM permissions required:
          - dynamodb:DeleteTable
          - dynamodb:DescribeTable
        """
        model = request.desired_state
        if not request.custom_context.get(REPEATED_INVOCATION):
            request.custom_context[REPEATED_INVOCATION] = True
            request.aws_client_factory.dynamodb.delete_table(TableName=model["TableName"])
            return ProgressEvent(
                status=OperationStatus.IN_PROGRESS,
                resource_model=model,
                custom_context=request.custom_context,
            )

        try:
            table_state = request.aws_client_factory.dynamodb.describe_table(
                TableName=model["TableName"]
            )

            match table_state["Table"]["TableStatus"]:
                case "DELETING":
                    return ProgressEvent(
                        status=OperationStatus.IN_PROGRESS,
                        resource_model=model,
                        custom_context=request.custom_context,
                    )
                case invalid_state:
                    return ProgressEvent(
                        status=OperationStatus.FAILED,
                        message=f"Table deletion failed. Table {model['TableName']} found in state {invalid_state}",  # TODO: not validated yet
                        resource_model={},
                    )
        except request.aws_client_factory.dynamodb.exceptions.TableNotFoundException:
            return ProgressEvent(
                status=OperationStatus.SUCCESS,
                resource_model={},
            )

    def update(
        self,
        request: ResourceRequest[DynamoDBTableProperties],
    ) -> ProgressEvent[DynamoDBTableProperties]:
        """
        Update a resource

        IAM permissions required:
          - dynamodb:UpdateTable
          - dynamodb:DescribeTable
          - dynamodb:DescribeTimeToLive
          - dynamodb:UpdateTimeToLive
          - dynamodb:UpdateContinuousBackups
          - dynamodb:UpdateContributorInsights
          - dynamodb:DescribeContinuousBackups
          - dynamodb:DescribeKinesisStreamingDestination
          - dynamodb:ListTagsOfResource
          - dynamodb:TagResource
          - dynamodb:UntagResource
          - dynamodb:DescribeContributorInsights
          - dynamodb:EnableKinesisStreamingDestination
          - dynamodb:DisableKinesisStreamingDestination
          - kinesis:DescribeStream
          - kinesis:PutRecords
          - iam:CreateServiceLinkedRole
          - kms:CreateGrant
          - kms:Describe*
          - kms:Get*
          - kms:List*
          - kms:RevokeGrant
        """
        raise NotImplementedError

    def get_ddb_provisioned_throughput(
        self,
        properties: dict,
    ) -> dict | None:
        # see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-dynamodb-table.html#cfn-dynamodb-table-provisionedthroughput
        args = properties.get("ProvisionedThroughput")
        if args == "AWS::NoValue":
            return None
        is_ondemand = properties.get("BillingMode") == "PAY_PER_REQUEST"
        # if the BillingMode is set to PAY_PER_REQUEST, you cannot specify ProvisionedThroughput
        # if the BillingMode is set to PROVISIONED (default), you have to specify ProvisionedThroughput

        if args is None:
            if is_ondemand:
                # do not return default value if it's on demand
                return

            # return default values if it's not on demand
            return {
                "ReadCapacityUnits": 5,
                "WriteCapacityUnits": 5,
            }

        if isinstance(args["ReadCapacityUnits"], str):
            args["ReadCapacityUnits"] = int(args["ReadCapacityUnits"])
        if isinstance(args["WriteCapacityUnits"], str):
            args["WriteCapacityUnits"] = int(args["WriteCapacityUnits"])

        return args

    def get_ddb_global_sec_indexes(
        self,
        properties: dict,
    ) -> list | None:
        args: list = properties.get("GlobalSecondaryIndexes")
        is_ondemand = properties.get("BillingMode") == "PAY_PER_REQUEST"
        if not args:
            return

        for index in args:
            # we ignore ContributorInsightsSpecification as not supported yet in DynamoDB and CloudWatch
            index.pop("ContributorInsightsSpecification", None)
            provisioned_throughput = index.get("ProvisionedThroughput")
            if is_ondemand and provisioned_throughput is None:
                pass  # optional for API calls
            elif provisioned_throughput is not None:
                # convert types
                if isinstance((read_units := provisioned_throughput["ReadCapacityUnits"]), str):
                    provisioned_throughput["ReadCapacityUnits"] = int(read_units)
                if isinstance((write_units := provisioned_throughput["WriteCapacityUnits"]), str):
                    provisioned_throughput["WriteCapacityUnits"] = int(write_units)
            else:
                raise Exception("Can't specify ProvisionedThroughput with PAY_PER_REQUEST")
        return args

    def get_ddb_kinesis_stream_specification(
        self,
        properties: dict,
    ) -> dict:
        args = properties.get("KinesisStreamSpecification")
        if args:
            args["TableName"] = properties["TableName"]
        return args

    def list(
        self,
        request: ResourceRequest[DynamoDBTableProperties],
    ) -> ProgressEvent[DynamoDBTableProperties]:
        resources = request.aws_client_factory.dynamodb.list_tables()
        return ProgressEvent(
            status=OperationStatus.SUCCESS,
            resource_models=[
                DynamoDBTableProperties(TableName=resource) for resource in resources["TableNames"]
            ],
        )
